﻿using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading;
using Pfz.Caching;

namespace Pfz.Remoting
{
	/// <summary>
	/// Represents a "Channel" inside a StreamChanneller. This is used by the remoting
	/// mechanism to separate each thread communication channel inside a single tcp/ip
	/// connection.
	/// </summary>
	public sealed class StreamChannel:
		ExceptionAwareStream,
		IChannel,
		IGarbageCollectionAware
	{
		#region Private and internal fields
			internal int _id;
			internal int _remoteId;
			
			internal readonly Queue<byte[]> _inMessages = new Queue<byte[]>();
			private bool _disposeBegan;
			private byte[] _actualMessage;
			private int _positionInActualMessage;
			
			private readonly byte[] _sendBuffer;
			private int _sendBufferPosition;
		#endregion
		
		#region Constructor
			internal StreamChannel(StreamChanneller channeller)
			{
				_channeller = channeller;
				_sendBuffer = new byte[channeller._channelBufferSize];
				
				GCUtils.RegisterForCollectedNotification(this);
			}
		#endregion
		#region Dispose
			/// <summary>
			/// Frees all needed resources and informs the remote side.
			/// </summary>
			/// <param name="disposing">True if called from Dispose()</param>
			protected override void OnDispose(bool disposing)
			{
				if (disposing)
				{
					GCUtils.UnregisterFromCollectedNotification(this);

					lock(DisposeLock)
						Monitor.PulseAll(DisposeLock);

					var channeller = _channeller;
					if (channeller != null)
					{
						_channeller = null;

						lock(channeller.DisposeLock)
						{
							if (!channeller.WasDisposed)
							{
								var disposeException = DisposeException;
								if (disposeException != null)
									channeller.Dispose(disposeException);
								else
								{
									/*try
									{
										// TODO remove it when the bug is found.
										StackTrace stackTrace = new StackTrace(true);
										File.AppendAllText("StreamChannelDispose.txt", "\r\n\r\n\r\n" + stackTrace.ToString());
									}
									catch
									{
									}*/

									if (_remoteId != -1)
										channeller._RemoveChannel(_id, _remoteId);
								}
							}
						}
					}
				}
			
				base.OnDispose(disposing);
			}
		#endregion
		#region BeginDispose
			internal void _BeginDispose()
			{
				var inMessages = _inMessages;
				if (inMessages == null)
					return;

				_remoteId = -1;
				
				lock(DisposeLock)
					if (inMessages.Count != 0)
						_disposeBegan = true;

				if (!_disposeBegan)
					Dispose();
			}
		#endregion
		#region _Collected
			void IGarbageCollectionAware.OnCollected()
			{
				try
				{
					var inMessages = _inMessages;

					if (WasDisposed || inMessages == null)
					{
						GCUtils.UnregisterFromCollectedNotification(this);
						return;
					}
				
					lock(DisposeLock)
						inMessages.TrimExcess();
				}
				catch
				{
				}
			}
		#endregion
		
		#region Properties
			/// <summary>
			/// Gets the LocalEndpoint.
			/// </summary>
			public string LocalEndpoint
			{
				get
				{
					return _channeller.LocalEndpoint + ":" + _id;
				}
			}

			/// <summary>
			/// Gets the RemoteEndpoint.
			/// </summary>
			public string RemoteEndpoint
			{
				get
				{
					return _channeller.RemoteEndpoint + ":" + _remoteId;
				}
			}

			#region Id
				/// <summary>
				/// Gets the Id given to this channel locally.
				/// </summary>
				public int Id
				{
					get
					{
						return _id;
					}
				}
			#endregion
			#region RemoteId
				/// <summary>
				/// Gets the Id given to this channel by the remote host.
				/// </summary>
				public int RemoteId
				{
					get
					{
						return _remoteId;
					}
				}
			#endregion
		
			#region Channeller
				internal StreamChanneller _channeller;
				
				/// <summary>
				/// Gets the channeller to which this channel belongs to.
				/// </summary>
				public StreamChanneller Channeller
				{
					get
					{
						return _channeller;
					}
				}
			#endregion
			
			#region Length
				/// <summary>
				/// Property from Stream. Always returns -1.
				/// </summary>
				public override long Length
				{
					get { return -1; }
				}
			#endregion
			#region Position
				/// <summary>
				/// Property from Stream. Always returns -1 and throws a NotSupportedException
				/// if set.
				/// </summary>
				public override long Position
				{
					get
					{
						return -1;
					}
					set
					{
						throw new NotSupportedException();
					}
				}
			#endregion
			
			#region CanRead
				/// <summary>
				/// Property from Stream. Always return true.
				/// </summary>
				public override bool CanRead
				{
					get { return true; }
				}
			#endregion
			#region CanSeek
				/// <summary>
				/// Property from Stream. Always return false.
				/// </summary>
				public override bool CanSeek
				{
					get { return false; }
				}
			#endregion
			#region CanWrite
				/// <summary>
				/// Property from Stream. Always return true.
				/// </summary>
				public override bool CanWrite
				{
					get { return true; }
				}
			#endregion

			#region CanTimeout
				/// <summary>
				/// Returns true.
				/// </summary>
				public override bool CanTimeout
				{
					get
					{
						return true;
					}
				}
			#endregion
			#region ReadTimeout
				private int _readTimeout = Timeout.Infinite;

				/// <summary>
				/// Gets or sets the read-timout of this channel.
				/// </summary>
				public override int ReadTimeout
				{
					get
					{
						return _readTimeout;
					}
					set
					{
						_readTimeout = value;
					}
				}
			#endregion
			#region WriteTimeout
				private int _writeTimeout = Timeout.Infinite;

				/// <summary>
				/// Gets or sets the write-timout of this channel.
				/// </summary>
				public override int WriteTimeout
				{
					get
					{
						return _writeTimeout;
					}
					set
					{
						_writeTimeout = value;
					}
				}
			#endregion
		#endregion
		#region Methods
			#region Flush
				/// <summary>
				/// Sends all buffered data to the stream.
				/// </summary>
				/// 
				public override void Flush()
				{
					lock(DisposeLock)
					{
						CheckUndisposed();

						if (_disposeBegan)
							throw new ObjectDisposedException("The stream is already disposing. It is only possible to read the remaining bytes.");

						int count = _sendBufferPosition;
					
						if (count == 0)
							return;
						
						try
						{
							_sendBufferPosition = 0;
							
							byte[] bufferCopy = new byte[count + 8];
							BitConverter.GetBytes(_id).CopyTo(bufferCopy, 0);
							BitConverter.GetBytes(count).CopyTo(bufferCopy, 4);
						
							Buffer.BlockCopy(_sendBuffer, 0, bufferCopy, 8, count);
						
							bool lockTaken = false;
							try
							{
								Monitor.TryEnter(_channeller.DisposeLock, _writeTimeout, ref lockTaken);
								if (!lockTaken)
									throw new TimeoutException("StreamChannel.Flush() timed-out.");

								_channeller.CheckUndisposed();

								_channeller._buffersToSend.Enqueue(bufferCopy);
								Monitor.Pulse(_channeller.DisposeLock);
							}
							finally
							{
								if (lockTaken)
									Monitor.Exit(_channeller.DisposeLock);
							}
						}
						catch(Exception exception)
						{
							Dispose(exception);
							throw;
						}
					}
				}
			#endregion
			#region Read
				/// <summary>
				/// Reads bytes from the channel.
				/// </summary>
				/// <param name="buffer">The buffer to store the read data.</param>
				/// <param name="offset">The initial position to store data in the buffer.</param>
				/// <param name="count">The number of bytes expected to read.</param>
				/// <returns>The number of bytes actually read.</returns>
				public override int Read(byte[] buffer, int offset, int count)
				{
                    if (count == 0)
                        return 0;

                    try
                    {
                        byte[] actualMessage = _actualMessage;
                        if (actualMessage == null)
                        {
                            lock (DisposeLock)
                            {
								while (true)
								{
									var disposeException = DisposeException;
									if (disposeException != null)
										throw new ObjectDisposedException("Object disposed: " + GetType().FullName, disposeException);

									if (WasDisposed)
										return 0;

                                    if (_inMessages.Count > 0)
                                    {
                                        actualMessage = _inMessages.Dequeue();
                                        _actualMessage = actualMessage;
                                        _positionInActualMessage = 0;
                                        break;
                                    }
                                    else
                                    {
                                        if (_disposeBegan)
                                        {
                                            Dispose();
                                            return 0;
                                        }
                                    }

									if (!Monitor.Wait(DisposeLock, _readTimeout))
	                                    throw new TimeoutException("Channel.Read() timed-out.");
                                }
                            }
                        }
					
					    int messageLength = actualMessage.Length;
					    int positionInActualMessage = _positionInActualMessage;
					    int remainingLength = messageLength - positionInActualMessage;
					
					    if (remainingLength <= count)
					    {
						    count = remainingLength;
						    _actualMessage = null;
					    }
					    else
						    _positionInActualMessage += count;
					
					    Buffer.BlockCopy(actualMessage, positionInActualMessage, buffer, offset, count);
                    }
					catch(Exception exception)
					{
						Dispose(exception);
						throw;
					}

                    return count;
				}
			#endregion
			#region Write
				/// <summary>
				/// Writes bytes into this channel.
				/// </summary>
				/// <param name="buffer">The buffer to get bytes to write.</param>
				/// <param name="offset">The initial position in the buffer to send.</param>
				/// <param name="count">The number of bytes from the buffer to send.</param>
				public override void Write(byte[] buffer, int offset, int count)
				{
					if (buffer == null)
						throw new ArgumentNullException("buffer");

					lock(DisposeLock)
					{
						CheckUndisposed();

						if (_disposeBegan)
							throw new ObjectDisposedException("The stream is already disposing. It is only possible to read the remaining bytes.");
						
						int bufferSize = _sendBuffer.Length;

						while(count > 0)
						{
							int remaining = bufferSize - _sendBufferPosition;
							
							int toCopy = remaining;
							if (remaining > count)
								toCopy = count;

							Buffer.BlockCopy(buffer, offset, _sendBuffer, _sendBufferPosition, toCopy);
							_sendBufferPosition += toCopy;

							if (_sendBufferPosition == bufferSize)
								Flush();

							count -= toCopy;
							offset += toCopy;
						}
					}
				}
			#endregion

			#region Seek
				/// <summary>
				/// Method from Stream. Throws a NotSupportedException.
				/// </summary>
				public override long Seek(long offset, SeekOrigin origin)
				{
					throw new NotSupportedException();
				}
			#endregion
			#region SetLength
				/// <summary>
				/// Method from Stream. Throws a NotSupportedException.
				/// </summary>
				public override void SetLength(long value)
				{
					throw new NotSupportedException();
				}
			#endregion
		#endregion

		#region IChannel Members
			IChanneller IChannel.Channeller
			{
				get
				{
					return _channeller;
				}
			}
			Stream IChannel.Stream
			{
				get
				{
					return this;
				}
			}
		#endregion
	}
}
